Streamlit in Snowflake で時系列予測の結果を可視化してみた #SnowflakeDB
はじめに
Streamlit in Snowflake と Snowflake Cortex ML-Based Functions を組み合わせて使ってみる機会があったので、Streamlit in Snowflake によるアプリ作成について確認しつつ時系列予測の結果を可視化して確認できる Streamlit アプリを作成してみました。
事前準備
データベース・スキーマの作成
はじめに以下のコマンドでサンプルデータ、アプリ、予測モデルの格納先となるデータベース・スキーマを作成しました。あわせて今回使用するウェアハウスも用意しています。
--データベース・スキーマの作成
USE ROLE SYSADMIN;
CREATE DATABASE streamlit_db;
CREATE SCHEMA streamlit_db.data;
CREATE SCHEMA streamlit_db.app;
CREATE SCHEMA streamlit_db.models;
--ウェアハウスの作成
CREATE WAREHOUSE streamlit_wh
WAREHOUSE_SIZE = small
WAREHOUSE_TYPE = standard
AUTO_SUSPEND = 60
AUTO_RESUME = True
INITIALLY_SUSPENDED = True;
使用データ
サンプルとして以下のデータを使用しました。このうち Orders テーブルを CSV ファイルに変換し使用します。
データロード
データのロードには、Snowsight からファイルをアップロードできる機能を使用しました。この機能を使用すると最大 250MB のサイズまでであれば、ブラウザ経由でファイルのアップロード、スキーマ検出機能によるカラム名、データ型の推測を行いつつ、テーブルを作成できます。
カラム名に空欄やハイフン(-
)が含まれるものは、アンダースコア(_
)に置き換えてロードすることとしました。
アクセス制御
はじめに、streamlit_creator
の名称で今回の作業に必要な権限を持つロールを作成します。
主な権限として以下が必要です。
- Streamlit アプリの作成権限
- アプリの作成先スキーマに対する以下の権限
CREATE STREAMLIT
CREATE STAGE
- 親データベース、スキーマに対する
USAGE
- アプリの作成先スキーマに対する以下の権限
- FORECAST による予測モデルの作成権限
- モデルの作成先スキーマに対する
CREATE SNOWFLAKE.ML.FORECAST
- 親データベース、スキーマに対する
USAGE
- 対象のデータ(テーブルやビュー)に対する参照権限
- モデルの作成先スキーマに対する
その他も含め各種権限は以下のコマンドで付与しました。
--roleの作成
USE ROLE USERADMIN;
CREATE ROLE streamlit_creator;
--権限の付与
USE ROLE SECURITYADMIN;
----ロール階層
GRANT ROLE streamlit_creator TO ROLE SYSADMIN;
----ウェアハウスの使用権限
GRANT USAGE ON WAREHOUSE streamlit_wh TO ROLE streamlit_creator;
----データアクセス:データ格納スキーマへの使用権限、テーブル・ビューの作成権限
GRANT USAGE ON SCHEMA streamlit_db.data TO ROLE streamlit_creator;
GRANT SELECT ON ALL TABLES IN SCHEMA streamlit_db.data TO ROLE streamlit_creator;
GRANT CREATE TABLE ON SCHEMA streamlit_db.data TO ROLE streamlit_creator;
GRANT CREATE VIEW ON SCHEMA streamlit_db.data TO ROLE streamlit_creator;
----streamlit権限:データベース・アプリ作成先のスキーマへの使用権限、スキーマレベルのStreamlitアプリ作成に必要な権限
GRANT USAGE ON SCHEMA streamlit_db.app TO ROLE streamlit_creator;
GRANT USAGE ON DATABASE streamlit_db TO ROLE streamlit_creator;
GRANT CREATE STREAMLIT ON SCHEMA streamlit_db.app TO ROLE streamlit_creator;
GRANT CREATE STAGE ON SCHEMA streamlit_db.app TO ROLE streamlit_creator;
----予測モデルの作成に必要なスキーマレベルの権限と作成先スキーマへの使用権限
GRANT CREATE SNOWFLAKE.ML.FORECAST ON SCHEMA streamlit_db.models TO ROLE streamlit_creator;
GRANT USAGE ON SCHEMA streamlit_db.models TO ROLE streamlit_creator;
--権限確認
SHOW GRANTS TO ROLE streamlit_creator;
分析モデルの作成
元データは日付ごとの粒度なので、ここでは月単位に集約したテーブルとモデルが参照するためのビューを以下の手順で用意しました。
/*============================================
分析
==============================================*/
--元データが日付単位なので、ここでは月単位に集約
USE ROLE STREAMLIT_CREATOR;
USE WAREHOUSE STREAMLIT_WH;
USE SCHEMA STREAMLIT_DB.DATA;
CREATE OR REPLACE TABLE monthly_sales AS
SELECT
DATE_TRUNC('MONTH', TO_TIMESTAMP(order_date, 'YYYY/MM/DD')) AS order_year_month
,SUM(sales) AS total_sales
FROM
orders
GROUP BY
DATE_TRUNC('MONTH',TO_TIMESTAMP(order_date, 'YYYY/MM/DD'))
ORDER BY
order_year_month;
--データの期間を確認
SELECT order_year_month
FROM monthly_sales;
--model作成用に2016年までのデータを抽出
CREATE VIEW VIEW_MONTHLY_SALES AS
SELECT
*
FROM MONTHLY_SALES
WHERE YEAR(order_year_month) <= 2016;
SELECT * FROM VIEW_MONTHLY_SALES;
続けて以下のコマンドでモデルの作成を行いました。予測結果はMy_forecasts
テーブルとして作成しています。
-- モデルの構築
CREATE SNOWFLAKE.ML.FORECAST models.my_model(
INPUT_DATA => SYSTEM$REFERENCE('VIEW', 'VIEW_MONTHLY_SALES'),
TIMESTAMP_COLNAME => 'ORDER_YEAR_MONTH',
TARGET_COLNAME => 'TOTAL_SALES'
);
--作成したモデルの確認
USE SCHEMA STREAMLIT_DB.MODELS;
SHOW SNOWFLAKE.ML.FORECAST;
--予測:1年間予測
CALL my_model!FORECAST(
FORECASTING_PERIODS => 12,
CONFIG_OBJECT => {'prediction_interval': 0.95}
);
CREATE TABLE STREAMLIT_DB.DATA.My_forecasts AS SELECT * FROM TABLE(RESULT_SCAN(-1));
-- 予測結果
USE SCHEMA STREAMLIT_DB.DATA;
SELECT * FROM My_forecasts;
可視化用にモデル作成用のビューとユニオンし整形します。
-- モデル作成用のデータと予測結果をユニオン
SELECT
ORDER_YEAR_MONTH
,TOTAL_SALES
, 'Actual' AS type
, NULL AS LOWER_BOUND
, NULL AS UPPER_BOUND
FROM STREAMLIT_DB.DATA.VIEW_MONTHLY_SALES
UNION ALL
SELECT
TS as ORDER_YEAR_MONTH
,FORECAST AS TOTAL_SALES
,'Forecast' AS type
,LOWER_BOUND
,UPPER_BOUND
FROM STREAMLIT_DB.DATA.MY_FORECASTS;
この時点で下図のようなデータになっています。
Snowsigh 上で可視化すると下図のようになっており、予測結果を図で確認できます。
Snowflake Cortex ML-Based Functionsを使った時系列予測については以下でも紹介されていますので、こちらもご参照ください。
Streamli in Snowflake でアプリ化
上記のモデルを使用し、ここではサンプルとして以下の機能を持つ Streamlit アプリを作成してみます。
- ユーザーによるモデルの再構築が可能
- ユーザーによる予測期間の変更が可能
すでに Streamlit アプリ作成用の権限は付与しているので、ここでは Snowsight から対象のスキーマに作成します。
上図で [Create] をクリックすると、対象のスキーマ配下にステージが作成されます。
デフォルトでは以下のパッケージが含まれます。追加したいパッケージがある場合、検索追加も可能です。
ここでは以下のパッケージを使用することにしました。
アプリケーションには以下のコードを使用しました。
設定した要件を満たすように、ユーザーからのインプットで予測期間を変更しつつ、予測の実行、モデルの再構築もできるような構成です。
可視化には plotnine パッケージを使用し Python から ggplot2 による可視化を試してみました。
# Import python packages
import streamlit as st
from snowflake.snowpark.context import get_active_session
import pandas as pd
from plotnine import ggplot, aes, geom_line, geom_point, geom_ribbon, labs, theme, element_text
session = get_active_session()
pd.set_option("max_colwidth",None)
# 関数定義
def create_model():
cmd = f"""
CREATE OR REPLACE SNOWFLAKE.ML.FORECAST STREAMLIT_DB.MODELS.MY_MODEL(
INPUT_DATA => SYSTEM$REFERENCE('VIEW', 'STREAMLIT_DB.DATA.VIEW_MONTHLY_SALES'),
TIMESTAMP_COLNAME => 'ORDER_YEAR_MONTH',
TARGET_COLNAME => 'TOTAL_SALES');
"""
res_df = session.sql(cmd).collect()
return res_df
def exe_forecast(input_period):
cmd = f"""
CREATE OR REPLACE TABLE STREAMLIT_DB.DATA.MY_FORECASTS AS
SELECT * FROM TABLE(STREAMLIT_DB.MODELS.MY_MODEL!FORECAST(FORECASTING_PERIODS => {input_period}));
"""
df_predicted = session.sql(cmd).collect()
return df_predicted
def union_data():
cmd = """
SELECT
ORDER_YEAR_MONTH
,TOTAL_SALES
,'Actual' AS type
,NULL AS LOWER_BOUND
,NULL AS UPPER_BOUND
FROM STREAMLIT_DB.DATA.VIEW_MONTHLY_SALES
UNION ALL
SELECT
TS as ORDER_YEAR_MONTH
,FORECAST AS TOTAL_SALES
,'Forecast' AS type
,LOWER_BOUND
,UPPER_BOUND
FROM STREAMLIT_DB.DATA.MY_FORECASTS;
"""
df_plot = session.sql(cmd).to_pandas()
return df_plot
def get_forecasting_period():
cmd = """
SELECT
left(date_trunc('MONTH',min(ts)),10) as min_year_month
,left(date_trunc('MONTH',max(ts)),10) as max_year_month
,COUNT(*) as cnt
FROM STREAMLIT_DB.DATA.MY_FORECASTS;
"""
df_period = session.sql(cmd).to_pandas()
min_year_month = df_period['MIN_YEAR_MONTH'][0]
max_year_month = df_period['MAX_YEAR_MONTH'][0]
forecast_period_cnt = int(df_period['CNT'].iloc[0])
return forecast_period_cnt,min_year_month,max_year_month
def create_plot(df):
plot = (ggplot(df, aes(x='ORDER_YEAR_MONTH', y='TOTAL_SALES', color='TYPE'))
+ geom_line()
+ geom_point()
+ geom_ribbon(aes(ymin='LOWER_BOUND', ymax='UPPER_BOUND', fill='TYPE'), alpha=0.2,linetype='None')
+ labs(x='', y='TOTAL_SALES')
+ theme(axis_title_y=element_text(size=10),
axis_text=element_text(size=8),
axis_text_x=element_text(angle=90),
legend_title=element_text(size=8),
legend_text=element_text(size=8))
)
return plot
# 初期値の取得
if 'forecast_period_cnt' not in st.session_state:
st.session_state.forecast_period_cnt, st.session_state.min_year_month, st.session_state.max_year_month = get_forecasting_period()
# サイドバーコンテンツ
with st.sidebar:
st.write("■モデルを再構築")
if st.button("Create Model"):
with st.spinner('実行中です...'):
res = create_model()
st.success('Model creation successful!', icon="✅")
st.write(res)
st.write("■予測期間(月)を選択し [Execute Forecating] で結果を表示")
input_period = st.number_input("予測期間(月)",min_value=1, max_value=24, value=st.session_state.forecast_period_cnt, step=1)
if st.button("Execute Forecasting"):
with st.spinner('実行中です...'):
df_predicted = exe_forecast(input_period)
st.success('Forecasting successful!', icon="✅")
st.session_state.forecast_period_cnt, st.session_state.min_year_month, st.session_state.max_year_month = get_forecasting_period()
st.write(df_predicted)
# メインコンテンツ
st.title("Snowflake Cortex ML-based function Forecasting")
st.write("Snowflake Cortex ML-based function の一つである Forecasting による時系列の予測と予測結果の可視化")
st.write(f"■予測期間:{st.session_state.forecast_period_cnt} ヶ月({st.session_state.min_year_month} ~ {st.session_state.max_year_month})")
df = union_data()
plot = create_plot(df)
st.pyplot(plot.draw())
st.write(f"■予測データ")
st.dataframe(df)
アプリを実行すると下図のようになります。
アプリの共有
ここではアプリの閲覧用ロールを作成します。Streamlit アプリの使用には以下の権限が必要です。
- Streamlit アプリの使用権限
- アプリの作成先データベース・スキーマに対する
USAGE
- アプリに対する
USAGE
- アプリの作成先データベース・スキーマに対する
アプリ名はSHOW STREAMLITS
で確認できます。
以下のコマンドでアプリの閲覧用ロールを用意しました。
USE ROLE USERADMIN;
CREATE ROLE streamlit_user;
--権限の付与
USE ROLE SECURITYADMIN;
----ロール階層
GRANT ROLE streamlit_user TO ROLE SYSADMIN;
GRANT USAGE ON DATABASE streamlit_db TO ROLE streamlit_user;
GRANT USAGE ON SCHEMA streamlit_db.app TO ROLE streamlit_user;
GRANT USAGE on STREAMLIT streamlit_db.app.U2UHUTC95R2VOFSA to ROLE streamlit_user;
Streamlit in Snowflake について | Snowflake Documentation
Streamlit アプリの特徴として、アプリはその所有者権限で実行されます。そのため、閲覧用ロールにはデータに対する参照権限やモデルの作成権限等は付与していません。
所有者の権利と Streamlit in Snowflake アプリの理解 | Snowflake Documentation
作成したロールに切り替えてアプリを表示してみます。この場合、View Only としてアプリを表示することができます。
所有者権限で実行されるので、モデルの再構築が可能です。
同様に、予測期間の変更とその期間による予測の実行も可能です。Streamlit によりアプリ化しているので、表示内容が動的に変更されます。
さいごに
Streamlit in Snowflake で時系列予測結果を表示するアプリケーションを作成してみました。
今回は単純な時系列のみの予測なので、シンプルな構成でした。他の変数を考慮に入れる場合は、予測用のデータを用意する必要があるのでもう少し工夫が必要ですが、Streamlit in Snowflake により分析も可能な UI も提供できるので活用の幅が広がると感じました。
こちらの記事が何かの参考になれば幸いです。